package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.KeyFunction;
import cn.itcast.function.SecIndexHbaseFunction;
import cn.itcast.function.SecIndexWindowFunction;
import cn.itcast.inter.ProcessDataInterface;
import cn.itcast.sink.SinkHbase;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;

public class IndexSecTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> waterData) {
        waterData.keyBy(new KeyFunction())
                .timeWindow(Time.seconds(5))
                .apply(new SecIndexWindowFunction())
                .timeWindowAll(Time.seconds(5))
                .apply(new SecIndexHbaseFunction())
                .addSink(new SinkHbase(QuotConfig.config.getProperty("index.hbase.table.name")));
    }
}
